package com.lelebd.szt.tags;

import cn.hutool.db.Db;
import cn.hutool.db.Entity;
import com.alibaba.fastjson.JSONObject;
import com.lelebd.szt.bean.StationCount;
import com.lelebd.szt.bean.Swipe;
import com.lelebd.szt.config.DealType;
import com.lelebd.szt.config.SZTConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.sql.SQLException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Properties;

public class StreamOfEnterAnalysis {
    StreamExecutionEnvironment env = null;
    FlinkKafkaConsumer010<String> consumer = null;

    public StreamOfEnterAnalysis() {
        env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "139.198.108.148:9092");
        /*properties.put("security.protocol", "SASL_PLAINTEXT");*/
        /*properties.put("kerberos.domain.name", "hadoop.hadoop.com");*/
        properties.put("group.id", "console-consumer-8102");
        /*properties.put("auto.commit.interval.ms", "6000000");
        properties.put("sasl.kerberos.service.name", "kafka");
        properties.put("client.id", "spt");*/
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        consumer = new FlinkKafkaConsumer010<String>(SZTConfig.TOPIC, new SimpleStringSchema(), properties);
        consumer.setStartFromEarliest();
        consumer.setStartFromGroupOffsets();
        DataStream<String> dataStream = env.addSource(consumer);
        SingleOutputStreamOperator<Swipe> swipeDataStream = dataStream.map(jsonString -> {
            Swipe swipe = JSONObject.toJavaObject(JSONObject.parseObject(jsonString), Swipe.class);
            return swipe;
        }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Swipe>() {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

            @Override
            public long extractAscendingTimestamp(Swipe ele) {
                try {
                    Date dealDate = sdf.parse(ele.getDeal_date());
                    return dealDate.getTime();
                } catch (ParseException e) {
                    e.printStackTrace();
                }
                return new Date().getTime();
            }
        });

        //3.3.1 - 深圳地铁进站人次排行榜：
        swipeDataStream.filter(swipe -> DealType.ENTER.getName().equalsIgnoreCase(swipe.getDeal_type()))
                .keyBy(Swipe::getStation).process(new KeyedProcessFunction<String, Swipe, StationCount>() {
            /**
             * 记录近N天的站点统计数据，eg:
             * 长龙:{
             *     2021-09-17:200000,
             *     2021-09-16:20000099,
             *     ...
             * },
             * ...
             */
            MapState<String, HashMap<String, Long>> latestState = null;

            @Override
            public void open(Configuration config) throws Exception {
                //初始化暂存器
                latestState = getRuntimeContext().getMapState(
                        new MapStateDescriptor("latestState", String.class, HashMap.class)
                );
            }

            @Override
            public void processElement(Swipe swipeEle, Context ctx, Collector<StationCount> out) throws Exception {
                //当前站点 eg: 长龙
                String station = swipeEle.getStation();
                //获取记录的日期：yyyy-MM-dd
                String date = swipeEle.getDeal_date().split(" ")[0];
                //暂存器元素：获取对应站点的所有日期进站统计列表
                HashMap<String, Long> stationDateCount = null;

                if (latestState.contains(station)) {
                    stationDateCount = latestState.get(station);
                } else {
                    stationDateCount = new HashMap<>(23);
                }

                //获取当前记录站点对应日期的的人流统计记录
                long count = stationDateCount.getOrDefault(date, 0L);

                //站点记录 + 1
                stationDateCount.put(date, count + 1);
                latestState.put(station, stationDateCount);

                //获取记录的事件时间 eg: 2018-09-01 11:19:04
                String dealDate = swipeEle.getDeal_date();

                //计算检查时间，注册定时器 eg: 1s 后检查
                //long expire = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(dealDate).getTime() + 1 * 1000L;
                long expire = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(dealDate).getTime() + 1000L;
                ctx.timerService().registerEventTimeTimer(expire);
            }

            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<StationCount> out) throws Exception {
                //String station = ctx.getCurrentKey();
                //日志输出观察当前的计数器记录
                StringBuilder resultBuilder = new StringBuilder();
                resultBuilder.append("==================").append("\r\n");
                resultBuilder.append("统计时间:").append(new Date(timestamp)).append("\r\n");
                latestState.keys().forEach(stationName -> {
                    try {
                        resultBuilder.append("站点 [").append(stationName).append("]: 人流[").append(latestState.get(stationName)).append("\r\n");
                    } catch (Exception exception) {
                        exception.printStackTrace();
                    }
                });
                resultBuilder.append("===============\r\n");

                String station = ctx.getCurrentKey();
                HashMap<String, Long> map = latestState.get(ctx.getCurrentKey());
                map.keySet().forEach(deal_date -> {
                    long count = map.get(deal_date);
                    try {
                        Db.use().insertForGeneratedKey(
                                Entity.create("szt_station_datecount")
                                .set("station", station)
                                .set("deal_date", deal_date)
                                .set("count", count)
                                .set("deal_time", new Date(timestamp))
                        );
                    } catch (SQLException sqlException) {
                        sqlException.printStackTrace();
                    }
                });

                System.out.println(resultBuilder.toString());
            }
        }).print("地铁站进站实时统计");

        try {
            env.execute("深圳地铁人流量实时统计");
        } catch (Exception exception) {
            exception.printStackTrace();
        }
    }

    public static void main(String[] args) {
        StreamOfEnterAnalysis tag = new StreamOfEnterAnalysis();
    }
}
